package day04;

import beans.SensorReading;
import day03.window.FlinkWindow00;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

/**
 * @author lvbingbing
 * @date 2022-01-11 14:54
 */
public class ProcessFunction03 {
    public static void main(String[] args) throws Exception {
        // 1、创建 FlinkWindow00 对象，有参构造会初始化 env，并从socket文本流中读取数据
        int parallelism = 1;
        FlinkWindow00 flinkWindow = new FlinkWindow00(parallelism);
        // 2、获取可执行环境
        StreamExecutionEnvironment env = flinkWindow.getEnv();
        // 3、侧输出流
        studySideOutputStream(flinkWindow.getSingleOutputStreamOperator());
        // 4、触发程序执行
        env.execute();
    }

    /**
     * 侧输出流（SideOutput）
     *
     * @param sensorReadingStream <br>
     */
    private static void studySideOutputStream(SingleOutputStreamOperator<SensorReading> sensorReadingStream) {

        OutputTag<SensorReading> outputTag = new OutputTag<SensorReading>("lowTemperature") {};

        SingleOutputStreamOperator<SensorReading> highTemperatureStream = sensorReadingStream.process(new ProcessFunction<SensorReading, SensorReading>() {

            private static final long serialVersionUID = -199627753847617749L;

            @Override
            public void processElement(SensorReading sensorReading, ProcessFunction<SensorReading, SensorReading>.Context ctx, Collector<SensorReading> out) throws Exception {
                if (sensorReading.getTemperature() > 30) {
                    out.collect(sensorReading);
                } else {
                    ctx.output(outputTag, sensorReading);
                }
            }
        });
        DataStream<SensorReading> lowTemperatureStream = highTemperatureStream.getSideOutput(outputTag);
        highTemperatureStream.print("高温流");
        lowTemperatureStream.print("低温流");
    }
}
